package wsz.global;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * 全局有序消息
 * @author wsz
 * @date 2022/2/28 10:43
 **/
public class GlobalOrderConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_wsz");
        consumer.setNamesrvAddr("192.168.174.138:9876");
        consumer.subscribe("tp_demo_08", "*");
        // 一个消费者线程处理，牺牲了并发性能来保证消息全局有序
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setPullBatchSize(1);
        consumer.setConsumeMessageBatchMaxSize(1);

        consumer.setMessageListener((MessageListenerOrderly) (msgs, context) -> {
            for (MessageExt messageExt : msgs) {
                System.out.println(messageExt);
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }
}
